Cloud Data Fusion の HTTPプラグイン を使って BigQuery にデータを投入してみた
こんにちは!エノカワです。
Google Cloud の データ統合サービスである Cloud Data Fusion は、 データパイプラインを迅速に構築・管理するためのフルマネージドなサービスです。
インフラストラクチャを管理することなく、ノンコーディングでデータパイプラインを構築できるということで、
「手軽に外部データをBigQueryテーブルに投入なんかもできるのかな?」と気になっていました。
そこで今回は、Webサービスから取得したデータをBigQueryテーブルに投入することを目標に Cloud Data Fusion を触ってみました。
概要
Cloud Data Fusion では、ノードを接続してパイプラインを構成します。
今回作成するパイプラインはこんな感じです。
HTTPノードでWebサービスからデータを取得し、BigQueryノードでBigQueryテーブルにデータを保存するパイプラインを作成します。
データ形式
シンプルなHTTPリクエスト&レスポンスで試したかったところに、うってつけのサービスがありましたので利用させていただきました!
HTTPリクエストをすると様々な形式のレスポンスを返してくれるWebサービスです。
例えば、下記URLにリクエストするとJSON形式のレスポンスを返してくれます。
https://httpbin.org/json
{ "slideshow": { "author": "Yours Truly", "date": "date of publication", "slides": [ { "title": "Wake up to WonderWidgets!", "type": "all" }, { "items": [ "Why <em>WonderWidgets</em> are great", "Who <em>buys</em> WonderWidgets" ], "title": "Overview", "type": "all" } ], "title": "Sample Slide Show" } }
今回は上記JSON形式のレスポンスボディをBigQueryテーブルに投入することを目指します。
インスタンス作成
まずは、Cloud Data Fusion のインスタンスを作成します。
メニューからData Fusion
を選択します。
Cloud Data Fusion API
が有効になっていれば、インスタンスページが表示されます。
Cloud Data Fusion API
が有効になっていない場合は、有効にする
ボタンをクリックしてAPIを有効にします。
インスタンスを作成
をクリックします。
インスタンス作成画面が表示されるので、作成するインスタンスの情報を入力します。
今回は以下の情報を入力しました。
- インスタンス名
fusion-training
- リージョン
asia-northeast1
を選択 - バージョン
6.6.0
を選択 - エディション
Basic
を選択
Cloud Data Fusion は、パイプラインの実行にDataproc クラスタを使用します。
Dataproc クラスタでパイプラインをプロビジョニングおよび実行できる権限を持つサービスアカウントをDataproc サービス アカウント
に指定する必要があります。
デフォルトでは Compute Engine アカウントがあらかじめ選択されていますが、任意のサービスアカウントを指定できます。
必要な権限が足りない場合は画像にあるような警告メッセージが表示されますが、ここで権限を付与できます。
今回はデフォルトの Compute Engine アカウントを指定することにして、必要な権限を付与しましょう。
権限を付与
ボタンをクリックします。
権限を付与
ボタンをクリック後、「IAM」のページを見に行ってみると以下の権限が付与されていました。
- Cloud Data Fusion 実行者
- Dataproc ワーカー
インスタンス情報の入力が完了したので、作成
ボタンをクリックします。
ちなみに、詳細オプションでは以下を設定できます。
今回はデフォルト(すべてOFF)のまま作成しました。
- プライベートIPを有効化
- Stackdriver Logging サービスを有効にする
- Stackdriver Monitoring サービスを有効にする
- 顧客管理の暗号鍵(CMEK)を使用する
作成
ボタンをクリック後、しばらくすると、インスタンスが作成されました。
インスタンスを表示
の箇所をクリックします。
これで準備が整いました。パイラインの作成に入っていきましょう!
パイプライン作成
Cloud Data Fusion のホーム画面が表示されました。
メニューからStudio
を選択します。
Pipeline Studio のページに切り替わりました。
このページで、ソース、変換、シンクなどのノードを接続することで、パイプラインを形成できます。
今回は画面左のSource
からHTTPノード、Sink
からBigQueryノードを選択して Pipeline Studio に配置し、それらを接続してパイプラインを形成します。
デフォルトではSource
にHTTPノードは無いようです。
HTTPプラグイン展開
Cloud Data Fusion には機能を拡張するために使用できるカスタマイズ可能なモジュールとして、プラグインがあります。
画面右上のHUB
をクリックするとコンポーネントの一覧が表示されるので、ここからHTTPプラグインを探します。
検索バーにHTTP
と入力すると、HTTPプラグインが見つかりました。
HTTPプラグインを選択し、Deploy
ボタンをクリックします。
デプロイ画面が表示されるので、Finish
ボタンをクリックします。
HTTPプラグインのデプロイが完了しました。
続けてパイプラインの作成を行いますので、Create a pipeline
ボタンをクリックします。
HTTPノード設定
画面左のSource
内にHTTPノードが追加されています。
選択すると、HTTPノードが Pipeline Studio に配置されました。
ソースとしてのHTTPノードが配置されましたので、httpbinにリクエストする設定をしていきましょう。
HTTPノードのProperties
をクリックします。
HTTPノードの設定画面が表示されました。
左側はHTTP設定、右側は出力データの形式です。
HTTP設定
識別名、リクエストURLを入力、HTTPメソッドを選択します。
今回は以下の情報を入力しました。
- Reference Name
from-httpbin
- URL
https://httpbin.org/json
- HTTP Method
GET
を選択
続けてHTTPレスポンスのフォーマットを入力します。
データ形式がJSONの場合、JSONパスとフィールドマッピングも指定できるので、レスポンスボディの内容をもとにマッピングします。
- Format
json
- JSON/MXL Result Path
/slideshow
- JSON/XML Fields Mapping
Field Name Field Path author /author date /date slides /slides title /title
{ "slideshow": { "author": "Yours Truly", "date": "date of publication", "slides": [ { "title": "Wake up to WonderWidgets!", "type": "all" }, { "items": [ "Why <em>WonderWidgets</em> are great", "Who <em>buys</em> WonderWidgets" ], "title": "Overview", "type": "all" } ], "title": "Sample Slide Show" } }
出力データ構造
Output Schema
に出力データ構造を入力します。
マッピングしたフィールドをもとにスキーマを構成します。
このデータ構造がBigQueryノードに渡されます。
BigQueryノード設定
次にBigQueryノードを設定します。
画面左のSink
からBigQueryを選択して、 Pipeline Studio にBigQueryノードを配置します。
HTTPノードから矢印を引っ張ってBigQueryノードに接続します。
BigQueryノードのProperties
をクリックします。
BigQueryノードの設定画面が表示されました。
左側は入力データ構造、右側はBigQueryテーブルの情報です。
入力データ構造
Input Schema
に入力データ構造が表示されています。編集はできません。
接続されたHTTPノードから渡されるデータ構造がそのまま入力データ構造となります。
テーブル設定
識別名、データセット名、テーブル名を入力します。
今回は以下の情報を入力しました。
- Reference Name
to-bigquery
- Dataset
df_work
- Table
httpbin_json
出力データ構造
Output Schema
にはBigQueryテーブルのスキーマ情報を入力します。
デフォルトで入力データ構造と同じ値が設定されていました。
今回はHTTPノードから渡されるデータをそのまま投入するので、変更は加えず入力データ構造と同じ設定のままとしました。
パイプライン確認
Cloud Data Fusionにはプレビュー機能があります。
作成したパイプラインが正しく動作するか確認してみましょう。
画面右上のPreview
をクリックすると、プレビューモードになります。
その状態で続けてRun
をクリックします。
パイプラインのプレビュー実行が開始されました。
画面上部にメッセージが表示されました。
プレビュー実行は正常に完了したようです。
パイプライン展開
プレビュー実行でパイプラインに問題無いことが確認できたので、パイプラインをデプロイします。
デプロイするためにはパイプラインを保存する必要があります。
Pipeline Studio 左上のName your pipeline
の箇所をクリックします。
任意のパイプライン名を入力し、Save
ボタンをクリックします。
今回はhttp-to-bigqueryとしました。
パイプラインが保存できたので、デプロイします。
Pipeline Studio 右上のDeploy
をクリックします。
パイプラインのデプロイが開始されました。
しばらくすると、画面が切り替わりました。
画面上部のStatusにDeployed
と表示されています。
パイプラインのデプロイが完了したようです。
パイプライン実行
それではパイプラインを実行してみましょう。
Pipeline Studio 右上のRun
をクリックします。
StatusがProvisioning
に変わりました。
時間経過とともにStatusが変わっていきます。
Status: Provisioning
Status: Running
Status: Succeeded
パイプラインの実行が正常に完了したようです。
Dataproc クラスタ
パイプライン実行の裏では、Datapoc クラスタが以下の変遷をたどっていました。
Status: Provisioning
→プロビジョニング中
Status: Running
→実行中
Status: Succeeded
→クラスタなし
データ確認
最後に、BigQueryテーブルにデータが投入されたか確認してみましょう。
BigQuerydf_work
データセットが作成され、その下にhttpbin_json
テーブルが作成されています。
プレビューでデータを確認すると、データが投入されていました!
httpbin のレスポンスボディの内容がちゃんと入っています。
スキーマも見てみましょう。
BigQueryノードのプロパティで設定したデータ構造でテーブルが作成されています!
まとめ
以上、Webサービスから取得したデータをBigQueryテーブルに投入するパイプラインを Cloud Data Fusion で構築してみました。
取り敢えずパイプランを構築することを目的としていたので、細かい設定は気にせず構築を進めてみましたが、期待動作するパイプラインが出来上がりました。
ノードを配置して接続してプロパティの必須項目を入力して、、という作業がすべてGUI上で完結するので、直感的にパイプライン構築ができました。
また、パイプライン実行時に自動でDataproc クラスタをデプロイ、実行、削除してくれるので、インフラ管理も気にする必要が無い点も実感することができました。
パイプラインのデプロイ前にプレビュー実行できるのも有り難い機能だと思いました!
ノード(プラグイン)も豊富なのでETL領域を広くカバーできそうです。
今回は取得したデータをそのまま保存するシンプルなパイプラインだったので変換(Transform)のノードは使いませんでしたが、次回はその辺りも含めて試してみたいと思います。
参考
- Cloud Data Fusion の概要 | Cloud Data Fusion のドキュメント | Google Cloud
- Cloud Data Fusion インスタンスの作成 | Cloud Data Fusion のドキュメント | Google Cloud
- サービス アカウント ユーザーの権限の付与 | Cloud Data Fusion のドキュメント | Google Cloud
- Cloud Data Fusion サービス アカウント | Cloud Data Fusion のドキュメント | Google Cloud
- Cloud Data Fusion のプラグイン リファレンス | Google Cloud
- HTTP Batch Source - CDAP Documentation - Confluence
- Google BigQuery Table Sink - CDAP Documentation - Confluence
- Cloud Data Fusion: Using HTTP Plugin to batch API Calls | by Justin Taras | Medium